package com.iu.distributed.lock.zk;

import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class ZkDistributedLock extends ZkDistributedLockParent implements Lock {
	private final static Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);

	private static Map<ZkClient, Map<String, LockKeyChangeListener>> zkDataListener = new HashMap<ZkClient, Map<String, LockKeyChangeListener>>();

	private ReentrantLock localLock = new ReentrantLock(true);
	private Condition localCondition = localLock.newCondition();
	private LockKeyChangeListener dataListener;
	private String lockKey;
	private Object initLock = new Object();
	private String localAddress;
	private String processName;
	private String lockPath;
	
	public ZkDistributedLock(IZkLockConfig config, String lockKey) {
		super(config);
		this.lockKey = lockKey;
		Preconditions.checkArgument(!Strings.isNullOrEmpty(lockKey), "ZkDistributedLock 需指定lockKey");
		Preconditions.checkArgument(lockKey.indexOf("/") == -1, "lockKey不可包函/");
		initLockPath();
		if (zkDataListener.get(super.getZkClient()) == null
				|| zkDataListener.get(super.getZkClient()).get(lockKey) == null) {
			synchronized (initLock) {
				String basePath = super.getConfig().getBasePath().replaceFirst("(.*)?/$", "$1");
				if (!super.getZkClient().exists(basePath)) {
					LOG.debug("分布式锁主路径不存在,准备创建!{}", basePath);
					super.getZkClient().createPersistent(basePath, "true");
				}
				LOG.debug("分布式锁主路径不存在,已创建!{}", super.getConfig().getBasePath());
				if (zkDataListener.get(super.getZkClient()) == null) {
					zkDataListener.put(super.getZkClient(), new HashMap<String, LockKeyChangeListener>());
				}
				if (zkDataListener.get(super.getZkClient()).get(lockKey) == null) {
					this.dataListener = new LockKeyChangeListener(this);
					zkDataListener.get(super.getZkClient()).put(lockKey, this.dataListener);
					super.getZkClient().subscribeDataChanges(lockPath, this.dataListener);
					LOG.debug("初始化锁路径监听,localPath:{}", this.lockPath);
				}
			}
		}
		try {
			localAddress = InetAddress.getLocalHost().getHostAddress();
			processName = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
		} catch (UnknownHostException e) {
			localAddress = ManagementFactory.getRuntimeMXBean().getName().split("@")[1];
		}
		this.dataListener = (LockKeyChangeListener) zkDataListener.get(super.getZkClient()).get(lockKey);
		dataListener.addDistributedLock(this);
	}
	protected boolean createRemoteLock() {
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		boolean res = false;
		try {
			super.getZkClient().create(lockPath, lockData + "@1", CreateMode.EPHEMERAL);
			LOG.debug("{}-{},获取锁成功!", new Object[] { lockData, lockPath });
			res = true;
		} catch (Exception ex) {
			String remoteLockData = super.getZkClient().readData(lockPath, true);
			if (remoteLockData != null && remoteLockData.startsWith(lockData)) {
				int count = Integer.parseInt(remoteLockData.split("@")[3]) + 1;
				super.getZkClient().writeData(lockPath, lockData + "@" + count);
				LOG.debug("{}-{},获取锁成功,获取次数:{}!", new Object[] { lockData, lockPath, count });
				res = true;
			}
		}
		return res;
	}
	protected void removeRemoteLock() {
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		String remoteLockData = super.getZkClient().readData(lockPath, true);
		if (remoteLockData != null && remoteLockData.startsWith(lockData)) {
			super.getZkClient().delete(lockPath);
			LOG.debug("{}-{},远程锁释放成功!", new Object[] { lockData, lockPath });
		}
		localLock.unlock();
	}
	@Override
	public void lock() {
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		for (;;) {
			LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
			localLock.lock();
			boolean res = createRemoteLock();
			if(res){
				break;
			}
			try {
				LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
				long waitFlag = localCondition.awaitNanos(TimeUnit.MILLISECONDS.toNanos(super.getConfig().getRetryMaxTime() == null ? 300 : super.getConfig().getRetryMaxTime()));
				if(waitFlag>0){
					LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
				}else{
					LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
				}
			} catch (InterruptedException e) {
			}
		}
	}

	@Override
	public void lockInterruptibly() throws InterruptedException {
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		for (;;) {
			LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
			localLock.lockInterruptibly();
			boolean res = createRemoteLock();
			if(res){
				break;
			}
			LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
			long waitFlag = localCondition.awaitNanos(TimeUnit.MILLISECONDS.toNanos(super.getConfig().getRetryMaxTime() == null ? 300 : super.getConfig().getRetryMaxTime()));
			if(waitFlag>0){
				LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
			}else{
				LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
			}
		}
	}

	@Override
	public boolean tryLock() {
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
		boolean res = localLock.tryLock();
		if (res) {
			res = createRemoteLock();
		}
		if(!res){
			LOG.debug("{}-{},获取锁失败!", new Object[] { lockData, lockPath });
		}
		return res;
	}

	@Override
	public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
		long lastTime = System.nanoTime();
		long now, offsetTime;
		boolean res = false;
		String lockData = localAddress + "@" + processName + "@" + Thread.currentThread().getId();
		for (;;) {
			now = System.nanoTime();
			offsetTime = now - lastTime;
			if (unit.toNanos(time) - offsetTime > 0) {
				LOG.debug("{}-{},准备获取锁!", new Object[] { lockData, lockPath });
				res = localLock.tryLock(unit.toNanos(time) - offsetTime, unit);
				if (res) {
					res = createRemoteLock();
					if(res){
						break;
					}
					now = System.nanoTime();
					offsetTime = unit.convert(now - lastTime, TimeUnit.NANOSECONDS);
					if (time - offsetTime > 0) {
						LOG.debug("{}-{},获取锁失败,进入等待状态!", new Object[] { lockData, lockPath });
						long waitFlag = localCondition.awaitNanos(unit.toNanos(time - offsetTime));
						if(waitFlag>0){
							LOG.debug("{}-{},获取锁收到信息苏醒,重新获取!", new Object[] { lockData, lockPath });
						}else{
							LOG.debug("{}-{},获取锁自动苏醒,重新获取!", new Object[] { lockData, lockPath });
						}
					}else{
						break;
					}
				} else {
					break;
				}
			} else {
				break;
			}
		}
		if(!res){
			LOG.debug("{}-{},获取锁失败!", new Object[] { lockData, lockPath });
		}
		return res;
	}

	@Override
	public void unlock() {
		localLock.lock();
		try {
			removeRemoteLock();
		} finally {
			localLock.unlock();
		}
	}
	/**
	 * 
	 * 该方法返回的Condition中的方法，当前仅对本地线程有效.
	 * await 仅释放本地锁，远程锁将继续保持直到调用unlock或应用中止
	 * @return   
	 * @see java.util.concurrent.locks.Lock#newCondition()
	 */
	@Override
	public Condition newCondition() {
		return localLock.newCondition();
	}

	public void destory() {
		LOG.debug("销毁锁,{}!", lockPath);
		this.dataListener.reomveDistributedLock(this);
		super.destory();
	}

	private void initLockPath() {
		if (super.getConfig().getBasePath() == null) {
			lockPath = "/";
		} else if (super.getConfig().getBasePath().endsWith("/")) {
			lockPath = super.getConfig().getBasePath();
		} else {
			lockPath = super.getConfig().getBasePath() + "/";
		}
		lockPath = lockPath + lockKey;
	}

	protected ReentrantLock getLocalLock() {
		return localLock;
	}

	protected Condition getLocalCondition() {
		return localCondition;
	}
	
	private class LockKeyChangeListener implements IZkDataListener {
		private Set<ZkDistributedLock> lockSet = new HashSet<ZkDistributedLock>();

		public void addDistributedLock(ZkDistributedLock lock) {
			lockSet.add(lock);
		}

		public void reomveDistributedLock(ZkDistributedLock lock) {
			lockSet.remove(lock);
		}

		public LockKeyChangeListener(ZkDistributedLock lock) {
			this.addDistributedLock(lock);
		}

		@Override
		public void handleDataChange(String dataPath, Object data) throws Exception {
			LOG.debug("远程锁发生变更,{}-{}!", new Object[] { lockPath, data });
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			LOG.debug("远程锁被释放,{}!", lockPath);
			for (ZkDistributedLock lock : lockSet) {
				lock.getLocalLock().lock();
				try {
					LOG.debug("通知本地线程获取锁,{}!", lockPath);
					lock.getLocalCondition().signalAll();
				} finally {
					lock.getLocalLock().unlock();
				}
			}
		}

	}
}
